package com.atguigu.flink.sql.connector;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**



 */
public class Demo9_ReadUpsertKafka
{
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        /*
            存储每一种传感器的累积vc
                scan.startup.mode: 如果从状态中没有获取偏移量，默认读取的位置策略。
                    在upsert-kafka中不支持。upsert-kafka默认只能从头去读，不支持自定义位置。

                    upsert-kafka的特点是可以写一个changelog流(有删除，有更新)。只有从头去读，才能得知数据的变化记录(changelog)
         */
        String createTableSql = "CREATE TABLE t1 (" +
            "  id STRING PRIMARY KEY NOT ENFORCED ," +
            "  vc INT " +
            ")  WITH (" +
            " 'connector' = 'upsert-kafka'," +
            "  'topic' = 't5'," +
            "  'properties.bootstrap.servers' = 'hadoop102:9092'," +
            "  'properties.group.id' = 'testGroup'," +
            "  'key.format' = 'json' ," +
            "  'value.format' = 'json' " +
            ")";

        tableEnv.executeSql(createTableSql);

       tableEnv.sqlQuery("select * from t1  ").execute().print();

    }
}
